package com.atguigu.flink.chapter6;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2020/12/21 14:35
 * <p>
 * return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4]));
 */
public class Flink02_Project_UV_2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env
          .readTextFile("input/UserBehavior.csv")
          .flatMap(new FlatMapFunction<String, Tuple2<Long, Long>>() {
              @Override
              public void flatMap(String value, Collector<Tuple2<Long, Long>> out) throws Exception {
                  String[] split = value.split(",");
                  UserBehavior behavior = new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4]));
                  if (behavior
                    .getBehavior()
                    .equals("pv")) {
                      out.collect(Tuple2.of(behavior.getUserId(), 1L));
                  }

              }
          })
          .keyBy(t -> t.f0)
          .max(1)
          .keyBy(t -> t.f1)
          .sum(1)
          .print();
        // 此种方法行不通!
        env.execute();
    }
}
/*
思路1:把用户id放入到一个set集合中, 计算set集合的长度
 */